Amazon Transcribeのジョブ実行結果をS3 Events/SNS/Lambdaでイベントドリブンに処理する
従来の Amazon Transcribe は、文字起こし結果を AWS の管理する S3 バケットに出力しました。今後は、ユーザーの管理する S3 バケットにも出力可能になりました。
ユーザーの管理する S3 バケットに出力することにより
- 文字起こし結果の権限・ライフサイクルの管理
- S3 への書き込みイベントをもとに処理を呼び出し
といったことが可能になります。
本ブログでは、Transcribe ジョブ結果がユーザーの管理する S3 バケットに書き込まれたあと、 S3 → SNS → Lambda と連携し、ジョブ結果ファイルを Lambda で処理できるように実装します。
文字起こしパイプラインイメージ図
イベントドリブンに
- S3 バケットに音源ファイルが書き込まれたら、文字起こしジョブを開始
- S3 バケットに文字起こし結果が出力されたら(=ジョブが完了)、文字起こし結果を処理
のようなことを行うと、次のようなパイプラインが考えられます。
S3 イベントから直接 Lambda を呼び出さず、SNS を挟んでいるのは
- S3 のイベント呼び出し条件はオーバーラップできないという制約がある(バケット全体と特定のprefixなど)
- Fanout しやすい
ためです。
1. transcription 出力用 S3 バケットの作成
Amazon Transcribe と同じリージョンに S3 バケットを作成します。
Amazon Transcribe(transcribe.amazonaws.com) が この S3 バケットにオブジェクトを更新(s3:PutObject
)できるようにするためのバケットポリシーは不要です。
ジョブ完了時にメッセージ送信する SNS トピックの作成
SNS トピックを作成します。
また、S3 がこのトピックにメッセージ送信(SNS:Publish
)できるように、トピックポリシーを設定します。
{ "Version": "2008-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "*" }, "Action": "SNS:Publish", "Resource": "arn:aws:sns:us-east-1:205974338614:TOPIC-NAME", "Condition": { "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:TRANSCRIPTION-BUCKET-NAME" } } } ] }
S3 Events で SNS を呼び出す
ジョブ実行結果が S3 バケットに書き込まれたたときに、この SNS トピックにメッセージを送信するように S3 イベント設定します。
今回はバケット全体に対してObjectCreated:Put
をトリガーにしイベントを発火します。
ジョブ完了時に呼び出される Lambda 関数を作成
S3 イベント情報をもとに、S3 にあるジョブ実行結果を取得し、この結果をダンプするだけの Lambda 関数を用意します。
実サービスでは、ジョブ実行結果ファイルから文字起こしされたテキストを抽出し、要件に合わせた処理を行うことになるかと思います。
''' Lambda function to retrieve transcription data from S3 Invoked by S3 events -> SNS -> Lambda ''' import json from logging import getLogger, DEBUG import boto3 logger = getLogger(__name__) logger.setLevel(DEBUG) s3_client = boto3.client('s3') def lambda_handler(event, context): logger.debug(event) for sns_record in event['Records']: sns_message = json.loads(sns_record['Sns']['Message']) for s3_record in sns_message['Records']: s3 = s3_record['s3'] logger.info(s3) # ジョブ結果ファイルを S3 から取得 response = s3_client.get_object( Bucket=s3['bucket']['name'], Key=s3['object']['key']) logger.debug('transcription data') logger.debug(response['Body'].read())
この Lambda 関数に対して、先程作成した SNS トピックを購読させます。
文字起こしパイプラインを実行
ジョブの作成
今回は管理コンソールからジョブを作成します。
ジョブ作成画面の最下部に ”Choose output locationInfo” という入力欄が追加されています。
デフォルトでは「Amazon default bucket」が選択されていますが、「My own bucket」を選択すると、バケット名を入力できるようになります。
S3 バケットは、Amazon Transcribe ジョブと同じリージョンにある必要があります。 また、現時点ではバケット名のみ指定可能で、バケット内でのパスは指定できません。
ジョブ完了後の処理を確認
CloudWatch Logs にある Lambda の実行ログから、 SNS トピックに送信されたメッセージを確認します。
... [DEBUG] 2018-07-19T09:31:03.242Z 740d17d2-8b36-11e8-831d-f9c26748560e transcription data [DEBUG] 2018-07-19T09:31:03.264Z 740d17d2-8b36-11e8-831d-f9c26748560e b'{"jobName":"YOUR-JOB-NAME","accountId":"123456789012","results":{"transcripts":[{"transcript":"... ...
無事、Amazon Transcribe のジョブ実行結果を取得できています。
補足:S3 Events のメッセージ詳細
S3 Events → SNS → Lambda と連携したときに Lambda に渡るメッセージのデータ構造を確認します。
ジョブが完了し、S3 に PutObject されると、以下の様なメッセージが Lambda に渡ります。
{ "Records": [ { "EventSource": "aws:sns", "EventVersion": "1.0", "EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:SNS-TOPIC-NAME:e7ac2ee1-b4b4-4155-a292-95464c676c6f", "Sns": { "Type": "Notification", "MessageId": "0733f0c2-6c59-5a45-9607-a30ab5c16d29", "TopicArn": "arn:aws:sns:us-east-1:123456789012:SNS-TOPIC-NAME", "Subject": "Amazon S3 Notification", "Message": "...", ... } } ] }
12 行目から Amazon S3 イベントであることがわかります。
13 行目の Message
がイベント発火の実態です。
この Message
の値を抜粋したのが以下です。
{ "Records": [ { "eventVersion": "2.0", "eventSource": "aws:s3", "awsRegion": "us-east-1", "eventTime": "2018-07-18T05:59:52.123Z", "eventName": "ObjectCreated:Put", "userIdentity": {...}, "requestParameters": {...}, "responseElements": {...}, "s3": { "s3SchemaVersion": "1.0", "configurationId": "S3-EVENT-NAME", "bucket": { "name": "TRANSCRIPTION-BUCKET-NAME", "ownerIdentity": { "principalId": "XXX" }, "arn": "arn:aws:s3:::TRANSCRIPTION-BUCKET-NAME" }, "object": { "key": "TRANSCRIPTION-JOB-NAME.json", "size": 94004, "eTag": "5c0a6bc6b063e27d9058d5ff6b206af2", "sequencer": "005B4ED757C1EDA88B" } } } ] }
S3 バケットに対する ObjectCreated:Put
のためにイベントが呼び出され、Put されたバケット名、オブジェクトキーもイベント情報に含まれています。
最後に
Amazon Transcribe の文字起こし結果をユーザーの S3 バケットに出力できるようになりました。
この機能を S3 Events と連携し、ジョブ完了後は即座にジョブ実行結果ファイルを Lambda 関数で処理してみました。
ジョブ完了時にイベント・ドリブンに処理したい場合、今回紹介した方法とは別に CloudWatch Events を利用するアプローチもあります。
次のブログを参照ください。
参考
- https://aws.amazon.com/blogs/machine-learning/amazon-transcribe-now-lets-you-designate-your-own-amazon-s3-buckets-to-store-transcription-outputs/
- Amazon Transcribe Now Lets You Designate Your Own Amazon S3 Buckets to Store Transcription Outputs
- AWS Documentation » Transcribe » Developer Guide » API Reference » Actions » StartTranscriptionJob